D:\a\scloud-dns\scloud-dns\src\workers\manager\channels_generation.rs
Line | Count | Source |
1 | | use crate::exceptions::SCloudException; |
2 | | use crate::workers::{SCloudWorker, WorkerType}; |
3 | | use std::collections::HashMap; |
4 | | use std::sync::Arc; |
5 | | use tokio::sync::mpsc; |
6 | | |
7 | 0 | pub(crate) async fn generate_channels( |
8 | 0 | workers: Vec<Arc<SCloudWorker>>, |
9 | 0 | ) -> Result<(), SCloudException> { |
10 | 0 | let mut wl: HashMap<&str, Vec<Arc<SCloudWorker>>> = HashMap::new(); |
11 | 0 | for w in workers { |
12 | 0 | let key = match &w.get_worker_type() { |
13 | 0 | WorkerType::LISTENER => "listener", |
14 | 0 | WorkerType::DECODER => "decoder", |
15 | 0 | WorkerType::QUERY_DISPATCHER => "query-dispatcher", |
16 | 0 | WorkerType::CACHE_LOOKUP => "cache-lookup", |
17 | 0 | WorkerType::ZONE_MANAGER => "zone-manager", |
18 | 0 | WorkerType::RESOLVER => "resolver", |
19 | 0 | WorkerType::CACHE_WRITER => "cache-writer", |
20 | 0 | WorkerType::ENCODER => "encoder", |
21 | 0 | WorkerType::SENDER => "sender", |
22 | 0 | WorkerType::CACHE_JANITOR => "cache-janitor", |
23 | 0 | WorkerType::METRICS => "metrics", |
24 | 0 | WorkerType::TCP_ACCEPTOR => "tcp-acceptor", |
25 | 0 | WorkerType::NONE => "none", |
26 | | }; |
27 | 0 | wl.entry(key).or_insert_with(Vec::new).push(Arc::clone(&w)); |
28 | | } |
29 | | |
30 | 0 | let default_worker = vec![Arc::new(SCloudWorker::new(WorkerType::NONE)?)]; |
31 | 0 | let tcp_acceptor = wl.get("tcp-acceptor").unwrap_or(&default_worker); |
32 | 0 | let decoder = wl.get("decoder").unwrap_or(&default_worker); |
33 | 0 | let query_dispatcher = wl.get("query-dispatcher").unwrap_or(&default_worker); |
34 | 0 | let cache_lookup = wl.get("cache-lookup").unwrap_or(&default_worker); |
35 | 0 | let cache_writers = wl.get("cache-writer").unwrap_or(&default_worker); |
36 | 0 | let zone_manager = wl.get("zone-manager").unwrap_or(&default_worker); |
37 | 0 | let resolvers = wl.get("resolver").unwrap_or(&default_worker); |
38 | 0 | let encoders = wl.get("encoder").unwrap_or(&default_worker); |
39 | 0 | let senders = wl.get("sender").unwrap_or(&default_worker); |
40 | | |
41 | | // Helper: wire N producers -> M consumers |
42 | | // Each producer gets M senders, each consumer gets N receivers |
43 | 0 | async fn wire( |
44 | 0 | producers: &[Arc<SCloudWorker>], |
45 | 0 | consumers: &[Arc<SCloudWorker>], |
46 | 0 | capacity: usize, |
47 | 0 | ) { |
48 | 0 | for p in producers { |
49 | 0 | let mut txs = Vec::new(); |
50 | 0 | for c in consumers { |
51 | 0 | let (tx, rx) = mpsc::channel(capacity); |
52 | 0 | c.push_dns_rx(rx).await; |
53 | 0 | txs.push(tx); |
54 | | } |
55 | 0 | p.push_dns_tx_many(txs).await; |
56 | | } |
57 | 0 | } |
58 | | |
59 | 0 | wire(tcp_acceptor, decoder, 1024).await; |
60 | 0 | wire(decoder, cache_lookup, 1024).await; |
61 | 0 | wire(cache_lookup, cache_writers, 1024).await; // tx[0] = miss path |
62 | 0 | wire(cache_lookup, query_dispatcher, 1024).await; // tx[1] = hit path |
63 | 0 | wire(query_dispatcher, zone_manager, 1024).await; |
64 | 0 | wire(query_dispatcher, resolvers, 1024).await; |
65 | 0 | wire(zone_manager, cache_writers, 1024).await; |
66 | 0 | wire(resolvers, cache_writers, 1024).await; |
67 | 0 | wire(cache_writers, encoders, 1024).await; |
68 | 0 | wire(encoders, senders, 1024).await; |
69 | | |
70 | 0 | Ok(()) |
71 | 0 | } |